package com.intct.util;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.java.utils.ParameterTool;

/**
 * @author gufg
 * @since 2025-07-27 10:47
 */
public class MySqlUtil {
    /**
     * 包装Flink的MySqlSource类
     * @param startupOptions 读取mysql语义
     * @return
     */
    public static MySqlSource<String> getMysqlSource(StartupOptions startupOptions) {
        // 获取配置文件信息
        ParameterTool parameterTool = FromPropertiesFileUtil.parameterTool;

        System.out.println(parameterTool.get("mysql-table", ""));
        // 配置源信息  -- mysql
        return MySqlSource.<String>builder()
                .hostname(parameterTool.get("mysql-host", "cdh-node"))
                .port(parameterTool.getInt("mysql-port", 13306))
                .username(parameterTool.get("mysql-username", "root"))
                .password(parameterTool.get("mysql-password", "Test_090110"))
                .databaseList(parameterTool.get("mysql-db", "travel"))
                .tableList(parameterTool.get("mysql-table", ""))
                .startupOptions(startupOptions)
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();
    }
}
